/*
 *
 * Copyright 2014 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * This file may have been modified by CloudWeGo authors. All CloudWeGo
 * Modifications are Copyright 2021 CloudWeGo Authors.
 */

package grpc

import (
	"context"
	"errors"
	"fmt"
	"io"
	"math"
	"net"
	"runtime/debug"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"github.com/cloudwego/netpoll"
	"golang.org/x/net/http2"
	"golang.org/x/net/http2/hpack"

	"github.com/cloudwego/kitex/pkg/gofunc"
	"github.com/cloudwego/kitex/pkg/klog"
	"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
	"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/grpc/grpcframe"
	"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/grpc/syscall"
	"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata"
	"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/peer"
	"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status"
	"github.com/cloudwego/kitex/pkg/utils"
)

// ticker is used to manage closeStreamTask.
// it triggers and cleans up actively cancelled streams every 5s.
// Streaming QPS is generally not too high, if there is a requirement for timeliness, then consider making it configurable.
// To reduce the overhead of goroutines in a multi-connection scenario, use the Sync SharedTicker
var ticker = utils.NewSyncSharedTicker(5 * time.Second)

// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
	lastRead   int64 // Keep this field 64-bit aligned. Accessed atomically.
	ctx        context.Context
	cancel     context.CancelFunc
	conn       net.Conn // underlying communication channel
	loopy      *loopyWriter
	remoteAddr net.Addr
	localAddr  net.Addr
	scheme     string

	readerDone chan struct{} // sync point to enable testing.
	writerDone chan struct{} // sync point to enable testing.
	// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
	// that the server sent GoAway on this transport.
	goAway chan struct{}

	framer *framer
	// controlBuf delivers all the control related tasks (e.g., window
	// updates, reset streams, and various settings) to the controller.
	// Do not access controlBuf with mu held.
	controlBuf *controlBuffer
	fc         *trInFlow

	kp               ClientKeepalive
	keepaliveEnabled bool

	initialWindowSize uint32

	// configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
	maxSendHeaderListSize *uint32

	bdpEst *bdpEstimator

	maxConcurrentStreams  uint32
	streamQuota           int64
	streamsQuotaAvailable chan struct{}
	waitingStreams        uint32
	nextID                uint32

	// Do not access controlBuf with mu held.
	mu            sync.Mutex // guard the following variables
	state         transportState
	activeStreams map[uint32]*Stream
	// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
	prevGoAwayID uint32

	goAwayReason GoAwayReason
	// A condition variable used to signal when the keepalive goroutine should
	// go dormant. The condition for dormancy is based on the number of active
	// streams and the `PermitWithoutStream` keepalive client parameter. And
	// since the number of active streams is guarded by the above mutex, we use
	// the same for this condition variable as well.
	kpDormancyCond *sync.Cond
	// A boolean to track whether the keepalive goroutine is dormant or not.
	// This is checked before attempting to signal the above condition
	// variable.
	kpDormant bool
	onGoAway  func(GoAwayReason)
	onClose   func()

	bufferPool *bufferPool
}

// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(ctx context.Context, conn net.Conn, opts ConnectOptions,
	remoteService string, onGoAway func(GoAwayReason), onClose func(),
) (_ *http2Client, err error) {
	scheme := "http"
	if opts.TLSConfig != nil {
		scheme = "https"
	}
	ctx, cancel := context.WithCancel(ctx)
	defer func() {
		if err != nil {
			cancel()
		}
	}()

	kp := opts.KeepaliveParams
	// Validate keepalive parameters.
	if kp.Time == 0 {
		kp.Time = defaultClientKeepaliveTime
	}
	if kp.Timeout == 0 {
		kp.Timeout = defaultClientKeepaliveTimeout
	}
	keepaliveEnabled := false
	if kp.Time != Infinity {
		if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
			return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
		}
		keepaliveEnabled = true
	}

	dynamicWindow := true
	icwz := initialWindowSize
	if opts.InitialConnWindowSize >= defaultWindowSize {
		icwz = opts.InitialConnWindowSize
		dynamicWindow = false
	}

	writeBufSize := defaultWriteBufferSize
	readBufSize := defaultReadBufferSize
	if opts.WriteBufferSize > 0 {
		writeBufSize = opts.WriteBufferSize
	}
	if opts.ReadBufferSize > 0 {
		readBufSize = opts.ReadBufferSize
	}
	maxHeaderListSize := defaultClientMaxHeaderListSize
	if opts.MaxHeaderListSize != nil {
		maxHeaderListSize = *opts.MaxHeaderListSize
	}
	t := &http2Client{
		ctx:                   ctx,
		conn:                  conn,
		cancel:                cancel,
		remoteAddr:            conn.RemoteAddr(),
		localAddr:             conn.LocalAddr(),
		scheme:                scheme,
		readerDone:            make(chan struct{}),
		writerDone:            make(chan struct{}),
		goAway:                make(chan struct{}),
		framer:                newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
		fc:                    &trInFlow{limit: icwz},
		activeStreams:         make(map[uint32]*Stream),
		kp:                    kp,
		keepaliveEnabled:      keepaliveEnabled,
		initialWindowSize:     initialWindowSize,
		nextID:                1,
		maxConcurrentStreams:  defaultMaxStreamsClient,
		streamQuota:           defaultMaxStreamsClient,
		streamsQuotaAvailable: make(chan struct{}, 1),
		onGoAway:              onGoAway,
		onClose:               onClose,
		bufferPool:            newBufferPool(),
	}
	t.controlBuf = newControlBuffer(t.ctx.Done())
	if opts.InitialWindowSize >= defaultWindowSize {
		t.initialWindowSize = opts.InitialWindowSize
		dynamicWindow = false
	}
	if dynamicWindow {
		t.bdpEst = &bdpEstimator{
			bdp:               initialWindowSize,
			updateFlowControl: t.updateFlowControl,
		}
	}
	t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
	task := &closeStreamTask{t: t}
	t.onClose = func() {
		onClose()
		// when http2Client has been closed, remove this task
		ticker.Delete(task)
	}
	ticker.Add(task)

	// Start the reader goroutine for incoming message. Each transport has
	// a dedicated goroutine which reads HTTP2 frame from network. Then it
	// dispatches the frame to the corresponding stream entity.
	if npconn, ok := t.conn.(netpoll.Connection); ok {
		npconn.SetOnRequest(func(ctx context.Context, connection netpoll.Connection) error {
			t.reader()
			return nil
		})
	} else {
		gofunc.RecoverGoFuncWithInfo(ctx, t.reader, gofunc.NewBasicInfo(remoteService, conn.RemoteAddr().String()))
	}

	if t.keepaliveEnabled {
		t.kpDormancyCond = sync.NewCond(&t.mu)
		gofunc.RecoverGoFuncWithInfo(ctx, t.keepalive, gofunc.NewBasicInfo(remoteService, conn.RemoteAddr().String()))
	}

	// Send connection preface to server.
	n, err := t.conn.Write(ClientPreface)
	if err != nil {
		err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
		t.Close(err)
		return nil, err
	}
	if n != ClientPrefaceLen {
		err = connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, ClientPrefaceLen)
		t.Close(err)
		return nil, err
	}

	ss := []http2.Setting{
		{
			ID:  http2.SettingInitialWindowSize,
			Val: uint32(t.initialWindowSize),
		},
	}
	err = t.framer.WriteSettings(ss...)
	if err != nil {
		err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
		t.Close(err)
		return nil, err
	}

	// Adjust the connection flow control window if needed.
	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
		if err := t.framer.WriteWindowUpdate(0, delta); err != nil {
			err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
			t.Close(err)
			return nil, err
		}
	}

	if err := t.framer.writer.Flush(); err != nil {
		return nil, err
	}
	gofunc.RecoverGoFuncWithInfo(ctx, func() {
		err := t.loopy.run(conn.RemoteAddr().String())
		if err != nil {
			klog.CtxErrorf(ctx, "KITEX: grpc client loopyWriter.run returning, error=%v", err)
		}
		// If it's a connection error, let reader goroutine handle it
		// since there might be data in the buffers.
		if _, ok := err.(net.Error); !ok {
			t.conn.Close()
		}
		close(t.writerDone)
	}, gofunc.NewBasicInfo(remoteService, conn.RemoteAddr().String()))

	return t, nil
}

// closeStreamTask is used to clean up streams that have been actively cancelled by users
type closeStreamTask struct {
	t              *http2Client
	toCloseStreams []*Stream
}

func (task *closeStreamTask) Tick() {
	trans := task.t
	trans.mu.Lock()
	for _, stream := range trans.activeStreams {
		select {
		// judge whether stream has been canceled
		case <-stream.Context().Done():
			task.toCloseStreams = append(task.toCloseStreams, stream)
		default:
		}
	}
	trans.mu.Unlock()

	for i, stream := range task.toCloseStreams {
		// uniformly converted to status error
		sErr := ContextErr(stream.Context().Err())
		trans.closeStream(stream, sErr, true, http2.ErrCodeCancel, status.Convert(sErr), nil, false)
		task.toCloseStreams[i] = nil
	}
	task.toCloseStreams = task.toCloseStreams[:0]
}

type preAllocatedStreamFields struct {
	recvBuffer *recvBuffer
	writeQuota *writeQuota
}

var (
	preallocateChan = make(chan preAllocatedStreamFields, 256)
	preallocateInit sync.Once
)

func fillStreamFields(s *Stream) {
	preallocateInit.Do(func() {
		go preallocateForStream()
	})
	var fields preAllocatedStreamFields
	select {
	case fields = <-preallocateChan:
	default: // won't block even the producer is not fast enough
		fields = allocateStreamFields()
	}
	s.buf = fields.recvBuffer
	s.wq = fields.writeQuota
	s.wq.done = s.done
}

func preallocateForStream() {
	defer func() {
		if err := recover(); err != nil {
			klog.Warnf("grpc.preallocateForStream panic, error=%v, stack=%s", err, debug.Stack())
		}
	}()
	for {
		preallocateChan <- allocateStreamFields()
	}
}

func allocateStreamFields() preAllocatedStreamFields {
	return preAllocatedStreamFields{
		recvBuffer: newRecvBuffer(),
		writeQuota: newWriteQuota(defaultWriteQuota, nil),
	}
}

func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
	s := &Stream{
		ctx:            ctx,
		ct:             t,
		done:           make(chan struct{}),
		method:         callHdr.Method,
		sendCompress:   callHdr.SendCompress,
		headerChan:     make(chan struct{}),
		contentSubtype: callHdr.ContentSubtype,
	}
	fillStreamFields(s)
	s.requestRead = func(n int) {
		t.adjustWindow(s, uint32(n))
	}
	s.trReader = &transportReader{
		reader: &recvBufferReader{
			ctx:     s.ctx,
			ctxDone: s.ctx.Done(),
			recv:    s.buf,
			closeStream: func(err error) {
				t.CloseStream(s, err)
			},
			freeBuffer: t.bufferPool.put,
		},
		windowHandler: func(n int) {
			t.updateWindow(s, uint32(n))
		},
	}
	return s
}

// TODO: mesh headers
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) []hpack.HeaderField {
	hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
	headerFields := make([]hpack.HeaderField, 0, hfLen)
	headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
	headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
	headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
	headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
	headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: defaultUserAgent})
	headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
	if callHdr.PreviousAttempts > 0 {
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
	}

	if callHdr.SendCompress != "" {
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
	}
	if dl, ok := ctx.Deadline(); ok {
		// Send out timeout regardless its value. The server can detect timeout context by itself.
		// TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
		timeout := time.Until(dl)
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
	}
	if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
		var k string
		for k, vv := range md {
			// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
			if isReservedHeader(k) {
				continue
			}
			for _, v := range vv {
				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
			}
		}
		for _, vv := range added {
			for i, v := range vv {
				if i%2 == 0 {
					k = v
					continue
				}
				// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
				if isReservedHeader(k) {
					continue
				}
				headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
			}
		}
	}
	return headerFields
}

func (t *http2Client) setPeer(ctx context.Context) {
	peer, ok := peer.GetPeerFromContext(ctx)
	if ok {
		peer.Addr = t.remoteAddr
	}
}

// NewStream creates a stream and registers it into the transport as "active"
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
	t.setPeer(ctx)
	s := t.newStream(ctx, callHdr)
	cleanup := func(err error) {
		if s.swapState(streamDone) == streamDone {
			// If it was already done, return.
			return
		}
		// The stream was unprocessed by the server.
		atomic.StoreUint32(&s.unprocessed, 1)
		s.write(recvMsg{err: err})
		close(s.done)
		// If headerChan isn't closed, then close it.
		if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
			close(s.headerChan)
		}
	}
	hdr := &headerFrame{
		hf:        t.createHeaderFields(ctx, callHdr),
		endStream: false,
		initStream: func(id uint32) error {
			t.mu.Lock()
			if state := t.state; state != reachable {
				t.mu.Unlock()
				// Do a quick cleanup.
				err := error(errStreamDrain)
				if state == closing {
					err = ErrConnClosing
				}
				cleanup(err)
				return err
			}
			// If the keepalive goroutine has gone dormant, wake it up.
			if t.kpDormant {
				t.kpDormancyCond.Signal()
			}
			t.mu.Unlock()
			return nil
		},
		onOrphaned: cleanup,
		wq:         s.wq,
	}
	firstTry := true
	var ch chan struct{}
	checkForStreamQuota := func(it interface{}) bool {
		if t.streamQuota <= 0 { // Can go negative if server decreases it.
			if firstTry {
				t.waitingStreams++
			}
			ch = t.streamsQuotaAvailable
			return false
		}
		if !firstTry {
			t.waitingStreams--
		}
		t.streamQuota--
		h := it.(*headerFrame)
		h.streamID = t.nextID
		t.nextID += 2
		s.id = h.streamID
		s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
		t.mu.Lock()
		// Don't create a stream if the transport is in a state of graceful shutdown or already closed
		if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
			t.mu.Unlock()
			return false
		}
		t.activeStreams[s.id] = s
		t.mu.Unlock()
		if t.streamQuota > 0 && t.waitingStreams > 0 {
			select {
			case t.streamsQuotaAvailable <- struct{}{}:
			default:
			}
		}
		return true
	}
	var hdrListSizeErr error
	checkForHeaderListSize := func(it interface{}) bool {
		if t.maxSendHeaderListSize == nil {
			return true
		}
		hdrFrame := it.(*headerFrame)
		var sz int64
		for _, f := range hdrFrame.hf {
			if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
				hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
				return false
			}
		}
		return true
	}
	for {
		success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
			return checkForHeaderListSize(it) && checkForStreamQuota(it)
		}, hdr)
		if err != nil {
			return nil, err
		}
		if success {
			break
		}
		if hdrListSizeErr != nil {
			return nil, hdrListSizeErr
		}
		firstTry = false
		select {
		case <-ch:
		case <-s.ctx.Done():
			return nil, ContextErr(s.ctx.Err())
		case <-t.goAway:
			return nil, errStreamDrain
		case <-t.ctx.Done():
			return nil, ErrConnClosing
		}
	}
	return s, nil
}

// CloseStream clears the footprint of a stream when the stream is not needed any more.
// This must not be executed in reader's goroutine.
func (t *http2Client) CloseStream(s *Stream, err error) {
	var (
		rst     bool
		rstCode http2.ErrCode
	)
	if err != nil {
		rst = true
		if errors.Is(err, errGracefulShutdown) {
			rstCode = gracefulShutdownCode
		} else {
			rstCode = http2.ErrCodeCancel
		}
		klog.CtxInfof(s.ctx, "KITEX: stream closed by ctx canceled, err: %v, rstCode: %d"+sendRSTStreamFrameSuffix, err, rstCode)
	}
	t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
}

// before invoking closeStream, pls do not hold the t.mu
// because accessing the controlbuf while holding t.mu will cause a deadlock.
func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
	// Set stream status to done.
	if s.swapState(streamDone) == streamDone {
		// If it was already done, return.  If multiple closeStream calls
		// happen simultaneously, wait for the first to finish.
		<-s.done
		return
	}
	// status and trailers can be updated here without any synchronization because the stream goroutine will
	// only read it after it sees an io.EOF error from read or write and we'll write those errors
	// only after updating this.
	s.status = st
	if len(mdata) > 0 {
		s.trailer = mdata
	}
	if err != nil {
		// This will unblock reads eventually.
		s.write(recvMsg{err: err})
	}

	// store closeStreamErr
	storeErr := err
	if err == io.EOF || err == nil {
		storeErr = st.Err()
	}
	if storeErr != nil {
		s.closeStreamErr.Store(storeErr)
	}

	// If headerChan isn't closed, then close it.
	if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
		s.noHeaders = true
		close(s.headerChan)
	}
	cleanup := &cleanupStream{
		streamID: s.id,
		onWrite: func() {
			t.mu.Lock()
			if t.activeStreams != nil {
				delete(t.activeStreams, s.id)
			}
			t.mu.Unlock()
		},
		rst:     rst,
		rstCode: rstCode,
	}
	addBackStreamQuota := func(interface{}) bool {
		t.streamQuota++
		if t.streamQuota > 0 && t.waitingStreams > 0 {
			select {
			case t.streamsQuotaAvailable <- struct{}{}:
			default:
			}
		}
		return true
	}
	t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
	// This will unblock write.
	close(s.done)
}

// Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be
// accessed any more.
//
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close(err error) error {
	t.mu.Lock()
	// Make sure we only Close once.
	if t.state == closing {
		t.mu.Unlock()
		return nil
	}
	// Call t.onClose before setting the state to closing to prevent the client
	// from attempting to create new streams ASAP.
	if t.onClose != nil {
		t.onClose()
	}
	t.state = closing
	streams := t.activeStreams
	t.activeStreams = nil
	if t.kpDormant {
		// If the keepalive goroutine is blocked on this condition variable, we
		// should unblock it so that the goroutine eventually exits.
		t.kpDormancyCond.Signal()
	}
	t.mu.Unlock()
	t.controlBuf.finish(err)
	t.cancel()
	cErr := t.conn.Close()

	// Notify all active streams.
	for _, s := range streams {
		t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
	}
	return cErr
}

// GracefulClose sets the state to draining, which prevents new streams from
// being created and causes the transport to be closed when the last active
// stream is closed.  If there are no active streams, the transport is closed
// immediately.  This does nothing if the transport is already draining or
// closing.
func (t *http2Client) GracefulClose() {
	t.mu.Lock()
	// Make sure we move to draining only from active.
	if t.state == draining || t.state == closing {
		t.mu.Unlock()
		return
	}
	t.state = draining
	active := len(t.activeStreams)
	t.mu.Unlock()
	if active == 0 {
		t.Close(connectionErrorfWithIgnorable(true, nil, "no active streams left to process while draining"))
		return
	}
	t.controlBuf.put(&incomingGoAway{})
}

// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
func (t *http2Client) Write(s *Stream, hdr, data []byte, opts *Options) error {
	if opts.Last {
		// If it's the last message, update stream state.
		if !s.compareAndSwapState(streamActive, streamWriteDone) {
			return s.getCloseStreamErr()
		}
	} else if s.getState() != streamActive {
		return s.getCloseStreamErr()
	}
	df := newDataFrame()
	df.streamID = s.id
	df.endStream = opts.Last
	df.h = hdr
	df.d = data
	df.originH = df.h
	df.originD = df.d
	if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
		if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
			return s.getCloseStreamErr()
		}
	}
	return t.controlBuf.put(df)
}

func (t *http2Client) getStream(f http2.Frame) *Stream {
	t.mu.Lock()
	s := t.activeStreams[f.Header().StreamID]
	t.mu.Unlock()
	return s
}

// adjustWindow sends out extra window update over the initial window size
// of stream if the application is requesting data larger in size than
// the window.
func (t *http2Client) adjustWindow(s *Stream, n uint32) {
	if w := s.fc.maybeAdjust(n); w > 0 {
		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
	}
}

// updateFlowControl updates the incoming flow control windows
// for the transport and the stream based on the current bdp
// estimation.
func (t *http2Client) updateFlowControl(n uint32) {
	updateIWS := func(interface{}) bool {
		t.initialWindowSize = n
		t.mu.Lock()
		for _, s := range t.activeStreams {
			s.fc.newLimit(n)
		}
		t.mu.Unlock()
		return true
	}
	t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
	t.controlBuf.put(&outgoingSettings{
		ss: []http2.Setting{
			{
				ID:  http2.SettingInitialWindowSize,
				Val: n,
			},
		},
	})
}

// updateWindow adjusts the inbound quota for the stream.
// Window updates will be sent out when the cumulative quota
// exceeds the corresponding threshold.
func (t *http2Client) updateWindow(s *Stream, n uint32) {
	if w := s.fc.onRead(n); w > 0 {
		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
	}
}

func (t *http2Client) handleData(f *grpcframe.DataFrame) {
	size := f.Header().Length
	var sendBDPPing bool
	if t.bdpEst != nil {
		sendBDPPing = t.bdpEst.add(size)
	}
	// Decouple connection's flow control from application's read.
	// An update on connection's flow control should not depend on
	// whether user application has read the data or not. Such a
	// restriction is already imposed on the stream's flow control,
	// and therefore the sender will be blocked anyways.
	// Decoupling the connection flow control will prevent other
	// active(fast) streams from starving in presence of slow or
	// inactive streams.
	//
	if w := t.fc.onData(size); w > 0 {
		t.controlBuf.put(&outgoingWindowUpdate{
			streamID:  0,
			increment: w,
		})
	}
	if sendBDPPing {
		// Avoid excessive ping detection (e.g. in an L7 proxy)
		// by sending a window update prior to the BDP ping.

		if w := t.fc.reset(); w > 0 {
			t.controlBuf.put(&outgoingWindowUpdate{
				streamID:  0,
				increment: w,
			})
		}

		t.controlBuf.put(bdpPing)
	}
	// Select the right stream to dispatch.
	s := t.getStream(f)
	if s == nil {
		return
	}
	if size > 0 {
		if err := s.fc.onData(size); err != nil {
			klog.CtxInfof(s.ctx, "KITEX: http2Client.handleData inflow control err: %v, rstCode: %d"+sendRSTStreamFrameSuffix, err, http2.ErrCodeFlowControl)
			t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
			return
		}
		if f.Header().Flags.Has(http2.FlagDataPadded) {
			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
			}
		}
		// TODO(bradfitz, zhaoq): A copy is required here because there is no
		// guarantee f.Data() is consumed before the arrival of next frame.
		// Can this copy be eliminated?
		if len(f.Data()) > 0 {
			buffer := t.bufferPool.get()
			buffer.Reset()
			buffer.Write(f.Data())
			s.write(recvMsg{buffer: buffer})
		}
	}
	// The server has closed the stream without sending trailers.  Record that
	// the read direction is closed, and set the status appropriately.
	if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
		t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
	}
}

func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
	s := t.getStream(f)
	if s == nil {
		return
	}
	if f.ErrCode == http2.ErrCodeRefusedStream {
		// The stream was unprocessed by the server.
		atomic.StoreUint32(&s.unprocessed, 1)
	}
	statusCode, ok := http2ErrConvTab[f.ErrCode]
	if !ok {
		klog.Warnf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received nhttp2 error %v", f.ErrCode)
		statusCode = codes.Unknown
	}
	if statusCode == codes.Canceled {
		if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
			// Our deadline was already exceeded, and that was likely the cause
			// of this cancelation.  Alter the status code accordingly.
			statusCode = codes.DeadlineExceeded
		}
	}
	var msg string
	if f.ErrCode == gracefulShutdownCode {
		msg = gracefulShutdownMsg
	} else {
		msg = fmt.Sprintf("stream terminated by RST_STREAM with error code: %v", f.ErrCode)
	}
	t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(statusCode, msg), nil, false)
}

func (t *http2Client) handleSettings(f *grpcframe.SettingsFrame, isFirst bool) {
	if f.IsAck() {
		return
	}
	var maxStreams *uint32
	var ss []http2.Setting
	var updateFuncs []func()
	f.ForeachSetting(func(s http2.Setting) error {
		switch s.ID {
		case http2.SettingMaxConcurrentStreams:
			maxStreams = new(uint32)
			*maxStreams = s.Val
		case http2.SettingMaxHeaderListSize:
			updateFuncs = append(updateFuncs, func() {
				t.maxSendHeaderListSize = new(uint32)
				*t.maxSendHeaderListSize = s.Val
			})
		default:
			ss = append(ss, s)
		}
		return nil
	})
	if isFirst && maxStreams == nil {
		maxStreams = new(uint32)
		*maxStreams = math.MaxUint32
	}
	sf := &incomingSettings{
		ss: ss,
	}
	if maxStreams != nil {
		updateStreamQuota := func() {
			delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
			t.maxConcurrentStreams = *maxStreams
			t.streamQuota += delta
			if delta > 0 && t.waitingStreams > 0 {
				close(t.streamsQuotaAvailable) // wake all of them up.
				t.streamsQuotaAvailable = make(chan struct{}, 1)
			}
		}
		updateFuncs = append(updateFuncs, updateStreamQuota)
	}
	t.controlBuf.executeAndPut(func(interface{}) bool {
		for _, f := range updateFuncs {
			f()
		}
		return true
	}, sf)
}

func (t *http2Client) handlePing(f *http2.PingFrame) {
	if f.IsAck() {
		// Maybe it's a BDP ping.
		if t.bdpEst != nil {
			t.bdpEst.calculate(f.Data)
		}
		return
	}
	pingAck := &ping{ack: true}
	copy(pingAck.data[:], f.Data[:])
	t.controlBuf.put(pingAck)
}

func (t *http2Client) handleGoAway(f *grpcframe.GoAwayFrame) {
	t.mu.Lock()
	if t.state == closing {
		t.mu.Unlock()
		return
	}
	if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
		klog.Infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
	}
	id := f.LastStreamID
	if id > 0 && id%2 != 1 {
		t.mu.Unlock()
		t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
		return
	}
	// A client can receive multiple GoAways from the server (see
	// https://github.com/grpc/grpc-go/issues/1387).  The idea is that the first
	// GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
	// sent after an RTT delay with the ID of the last stream the server will
	// process.
	//
	// Therefore, when we get the first GoAway we don't necessarily close any
	// streams. While in case of second GoAway we close all streams created after
	// the GoAwayId. This way streams that were in-flight while the GoAway from
	// server was being sent don't get killed.
	select {
	case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
		// If there are multiple GoAways the first one should always have an ID greater than the following ones.
		if id > t.prevGoAwayID {
			t.mu.Unlock()
			t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
			return
		}
	default:
		t.setGoAwayReason(f)
		close(t.goAway)
		defer t.controlBuf.put(&incomingGoAway{}) // Defer as t.mu is currently held.
		// Notify the clientconn about the GOAWAY before we set the state to
		// draining, to allow the client to stop attempting to create streams
		// before disallowing new streams on this connection.
		if t.state != draining {
			if t.onGoAway != nil {
				t.onGoAway(t.goAwayReason)
			}
			t.state = draining
		}
	}
	// All streams with IDs greater than the GoAwayId
	// and smaller than the previous GoAway ID should be killed.
	upperLimit := t.prevGoAwayID
	if upperLimit == 0 { // This is the first GoAway Frame.
		upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
	}
	t.prevGoAwayID = id
	active := len(t.activeStreams)
	if active <= 0 {
		t.mu.Unlock()
		t.Close(connectionErrorfWithIgnorable(true, nil, "received goaway and there are no active streams"))
		return
	}

	var unprocessedStream []*Stream
	for streamID, stream := range t.activeStreams {
		if streamID > id && streamID <= upperLimit {
			// The stream was unprocessed by the server.
			atomic.StoreUint32(&stream.unprocessed, 1)
			unprocessedStream = append(unprocessedStream, stream)
		}
	}
	t.mu.Unlock()

	// we should not access controlBuf with t.mu held since it will cause deadlock.
	// Pls refer to checkForStreamQuota in NewStream, it gets the controlbuf.mu and
	// wants to get the t.mu.
	for _, stream := range unprocessedStream {
		t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
	}
}

// setGoAwayReason sets the value of t.goAwayReason based
// on the GoAway frame received.
// It expects a lock on transport's mutext to be held by
// the caller.
func (t *http2Client) setGoAwayReason(f *grpcframe.GoAwayFrame) {
	t.goAwayReason = GoAwayNoReason
	switch f.ErrCode {
	case http2.ErrCodeEnhanceYourCalm:
		if string(f.DebugData()) == "too_many_pings" {
			t.goAwayReason = GoAwayTooManyPings
		}
	}
}

func (t *http2Client) GetGoAwayReason() GoAwayReason {
	t.mu.Lock()
	defer t.mu.Unlock()
	return t.goAwayReason
}

func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
	t.controlBuf.put(&incomingWindowUpdate{
		streamID:  f.Header().StreamID,
		increment: f.Increment,
	})
}

// operateHeaders takes action on the decoded headers.
func (t *http2Client) operateHeaders(frame *grpcframe.MetaHeadersFrame) {
	s := t.getStream(frame)
	if s == nil {
		return
	}
	endStream := frame.StreamEnded()
	atomic.StoreUint32(&s.bytesReceived, 1)
	initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0

	if !initialHeader && !endStream {
		// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
		st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
		klog.CtxInfof(s.ctx, "KITEX: http2Client.operateHeaders received HEADERS frame in the middle of a stream, rstCode: %d"+sendRSTStreamFrameSuffix, http2.ErrCodeProtocol)
		t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
		return
	}

	state := &decodeState{}
	// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
	state.data.isGRPC = !initialHeader
	if err := state.decodeHeader(frame); err != nil {
		klog.CtxInfof(s.ctx, "KITEX: http2Client.operateHeaders decode HEADERS frame failed, err: %v, rstCode: %d"+sendRSTStreamFrameSuffix, err, http2.ErrCodeProtocol)
		t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
		return
	}

	// If headerChan hasn't been closed yet
	if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
		s.headerValid = true
		if !endStream {
			// These values can be set without any synchronization because
			// stream goroutine will read it only after seeing a closed
			// headerChan which we'll close after setting this.
			s.recvCompress = state.data.encoding
			if len(state.data.mdata) > 0 {
				s.header = state.data.mdata
			}
		} else {
			// HEADERS frame block carries a Trailers-Only.
			s.noHeaders = true
		}
		close(s.headerChan)
	}

	if !endStream {
		return
	}

	// if client received END_STREAM from server while stream was still active, send RST_STREAM
	rst := s.getState() == streamActive
	s.SetBizStatusErr(state.bizStatusErr())
	t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
}

// reader runs as a separate goroutine in charge of reading data from network
// connection.
//
// TODO(zhaoq): currently one reader per transport. Investigate whether this is
// optimal.
// TODO(zhaoq): Check the validity of the incoming frame sequence.
func (t *http2Client) reader() {
	defer close(t.readerDone)
	// Check the validity of server preface.
	frame, err := t.framer.ReadFrame()
	if err != nil {
		err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
		t.Close(err) // this kicks off resetTransport, so must be last before return
		return
	}
	t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
	if t.keepaliveEnabled {
		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
	}
	sf, ok := frame.(*grpcframe.SettingsFrame)
	if !ok {
		err = connectionErrorf(true, err, "first frame received is not a setting frame")
		t.Close(err) // this kicks off resetTransport, so must be last before return
		return
	}
	t.handleSettings(sf, true)

	// loop to keep reading incoming messages on this transport.
	for {
		t.controlBuf.throttle()
		frame, err := t.framer.ReadFrame()
		if t.keepaliveEnabled {
			atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
		}
		if err != nil {
			// Abort an active stream if the http2.Framer returns a
			// http2.StreamError. This can happen only if the server's response
			// is malformed http2.
			if se, ok := err.(http2.StreamError); ok {
				t.mu.Lock()
				s := t.activeStreams[se.StreamID]
				t.mu.Unlock()
				if s != nil {
					// use error detail to provide better err message
					code := http2ErrConvTab[se.Code]
					err := t.framer.ErrorDetail()
					var msg string
					if err != nil {
						msg = err.Error()
					}
					klog.CtxInfof(s.ctx, "KITEX: http2Client.reader encountered http2.StreamError: %v, rstCode: %d"+sendRSTStreamFrameSuffix, se, http2.ErrCodeProtocol)
					t.closeStream(s, status.New(code, msg).Err(), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
				}
				continue
			} else {
				// Transport error.
				err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
				t.Close(err)
				return
			}
		}
		switch frame := frame.(type) {
		case *grpcframe.MetaHeadersFrame:
			t.operateHeaders(frame)
		case *grpcframe.DataFrame:
			t.handleData(frame)
		case *http2.RSTStreamFrame:
			t.handleRSTStream(frame)
		case *grpcframe.SettingsFrame:
			t.handleSettings(frame, false)
		case *http2.PingFrame:
			t.handlePing(frame)
		case *grpcframe.GoAwayFrame:
			t.handleGoAway(frame)
		case *http2.WindowUpdateFrame:
			t.handleWindowUpdate(frame)
		default:
			klog.Warnf("transport: http2Client.reader got unhandled frame type %v.", frame)
		}
		t.framer.reader.Release()
	}
}

func minTime(a, b time.Duration) time.Duration {
	if a < b {
		return a
	}
	return b
}

// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
	p := &ping{data: [8]byte{}}
	// True iff a ping has been sent, and no data has been received since then.
	outstandingPing := false
	// Amount of time remaining before which we should receive an ACK for the
	// last sent ping.
	timeoutLeft := time.Duration(0)
	// Records the last value of t.lastRead before we go block on the timer.
	// This is required to check for read activity since then.
	prevNano := time.Now().UnixNano()
	timer := time.NewTimer(t.kp.Time)
	for {
		select {
		case <-timer.C:
			lastRead := atomic.LoadInt64(&t.lastRead)
			if lastRead > prevNano {
				// There has been read activity since the last time we were here.
				outstandingPing = false
				// Next timer should fire at kp.Time seconds from lastRead time.
				timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
				prevNano = lastRead
				continue
			}
			if outstandingPing && timeoutLeft <= 0 {
				t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
				return
			}
			t.mu.Lock()
			if t.state == closing {
				// If the transport is closing, we should exit from the
				// keepalive goroutine here. If not, we could have a race
				// between the call to Signal() from Close() and the call to
				// Wait() here, whereby the keepalive goroutine ends up
				// blocking on the condition variable which will never be
				// signalled again.
				t.mu.Unlock()
				return
			}
			if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
				// If a ping was sent out previously (because there were active
				// streams at that point) which wasn't acked and its timeout
				// hadn't fired, but we got here and are about to go dormant,
				// we should make sure that we unconditionally send a ping once
				// we awaken.
				outstandingPing = false
				t.kpDormant = true
				t.kpDormancyCond.Wait()
			}
			t.kpDormant = false
			t.mu.Unlock()

			// We get here either because we were dormant and a new stream was
			// created which unblocked the Wait() call, or because the
			// keepalive timer expired. In both cases, we need to send a ping.
			if !outstandingPing {
				t.controlBuf.put(p)
				timeoutLeft = t.kp.Timeout
				outstandingPing = true
			}
			// The amount of time to sleep here is the minimum of kp.Time and
			// timeoutLeft. This will ensure that we wait only for kp.Time
			// before sending out the next ping (for cases where the ping is
			// acked).
			sleepDuration := minTime(t.kp.Time, timeoutLeft)
			timeoutLeft -= sleepDuration
			timer.Reset(sleepDuration)
		case <-t.ctx.Done():
			if !timer.Stop() {
				<-timer.C
			}
			return
		}
	}
}

func (t *http2Client) Error() <-chan struct{} {
	return t.ctx.Done()
}

func (t *http2Client) GoAway() <-chan struct{} {
	return t.goAway
}

func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
func (t *http2Client) LocalAddr() net.Addr  { return t.localAddr }

// IsActive return the connection's state, check if it's reachable.
func (t *http2Client) IsActive() bool {
	t.mu.Lock()
	defer t.mu.Unlock()
	return t.state == reachable
}
